package com.bizmda.bizsip.app.listener;

import cn.hutool.core.text.StrFormatter;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.bizmda.bizsip.app.executor.AbstractAppExecutor;
import com.bizmda.bizsip.common.*;
import com.bizmda.bizsip.app.config.AppServiceMapping;
import com.bizmda.bizsip.app.config.RabbitmqConfig;
import com.bizmda.bizsip.service.AppLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * RabbitMQ接收服务
 */
@Slf4j
@Service
public class DelayAppServiceQueueListener {
    @Autowired
    private AppServiceMapping appServiceMapping;
    @Autowired
    private AppLogService appLogService;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RabbitmqConfig.DELAY_SERVICE_QUEUE, durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = RabbitmqConfig.DELAY_SERVICE_EXCHANGE, type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
            key = RabbitmqConfig.DELAY_SERVICE_ROUTING_KEY))
    public void doService(Message message) {
        Map<String,Object> map = (Map<String,Object>)jackson2JsonMessageConverter.fromMessage(message);
        String serviceId = (String)map.get("serviceId");
        int retryCount = (int)map.get("retryCount");
        List<Integer> delayMillisecondList = (List<Integer>)map.get("delayMilliseconds");
        if (delayMillisecondList == null) {
            delayMillisecondList = new ArrayList<>();
        }
        JSONObject jsonObject = (JSONObject)JSONUtil.parse(map.get("bizmessage"));
        BizMessage<JSONObject> bizMessage = new BizMessage<>(jsonObject);
        BizMessage<JSONObject> inMessage = BizTools.copyBizMessage(bizMessage);
        log.debug("收到延迟App服务[{}]: 第{}次，延迟时间[{}]",serviceId,retryCount+1,delayMillisecondList);
        log.trace("收到延迟App服务消息:\n{}",BizUtils.buildBizMessageLog(bizMessage));

        BizMessage<JSONObject> outBizMessage;
        AbstractAppExecutor appExecutor = this.appServiceMapping.getAppExecutor(serviceId);
        if (appExecutor == null) {
            log.error("延迟App服务[{}]不存在",serviceId);
            outBizMessage = BizMessage.buildFailMessage(bizMessage
                    ,new BizException(BizResultEnum.APP_SERVICE_NOT_FOUND
                    ,StrFormatter.format("延迟App服务[{}]不存在",serviceId)));
            log.debug("发送交易失败日志");
            this.appLogService.sendAppFailLog(inMessage,outBizMessage);
            return;
        }
        TmContext tmContext = new TmContext();
        tmContext.setRetryCount(retryCount+1);

        BizTools.tmContextThreadLocal.set(tmContext);
        BizTools.bizMessageThreadLocal.set(bizMessage);

        log.debug("调用延迟App服务[{}]",serviceId);
        try {
            outBizMessage = appExecutor.doAppService(bizMessage);
            if (outBizMessage.getCode() != 0) {
                throw new BizException(outBizMessage);
            }
            log.trace("延迟App服务响应报文:\n{}",BizUtils.buildJsonLog(outBizMessage));
        } catch (BizException e) {
            if (e.getCode() != BizResultEnum.RETRY_DELAY_APP_SERVICE.getCode()) {
                outBizMessage = BizMessage.buildFailMessage(bizMessage, e);
                log.debug("发送交易失败日志");
                this.appLogService.sendAppFailLog(inMessage, outBizMessage);
                return;
            }
            // 聚合服务返回超时的处理
            retryCount ++;
            if (retryCount >= delayMillisecondList.size()) {
                // 重试次数超限
                outBizMessage = BizMessage.buildFailMessage(bizMessage, new BizException(BizResultEnum.APP_SERVICE_MAXIMUM_RETRY_ERROR));
                log.debug("发送交易失败日志");
                this.appLogService.sendAppFailLog(inMessage, outBizMessage);
                return;
            }
            int delayMillisecond = delayMillisecondList.get(retryCount);
            map.put("retryCount",retryCount);
            log.debug("调用延迟App服务[{}]:第{}次，延迟时间[{}ms]",serviceId,retryCount+1,delayMillisecond);
            log.trace("延迟App服务返回报文:\n{}",BizUtils.buildBizMessageLog(bizMessage));

            this.rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_SERVICE_EXCHANGE, RabbitmqConfig.DELAY_SERVICE_ROUTING_KEY, map,
                    new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) {
                            //设置消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            message.getMessageProperties().setDelay(delayMillisecond);
                            return message;
                        }
                    });
            outBizMessage = BizMessage.buildFailMessage(bizMessage, new BizException(BizResultEnum.RETRY_DELAY_APP_SERVICE));
            log.debug("发送交易挂起日志");
            this.appLogService.sendAppSuspendLog(inMessage, outBizMessage);
            return;
        } finally {
            BizTools.tmContextThreadLocal.remove();
            BizTools.bizMessageThreadLocal.remove();
        }

        log.debug("发送交易成功日志");
        this.appLogService.sendAppSuccessLog(inMessage,outBizMessage);
        return;
    }
}
